package com.flowcloud.kafka.exception;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;

/**
 * @ClassName CustomListenerErrorListener
 * @Description 自定义消费者异常处理器
 * @Date 2021/7/26/10:58
 * @Author yinpan
 * @Version 1.0
 */
@Slf4j
@Component
public class CustomListenerErrorListener {

	@Bean
	public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
		return (message, e, consumer) -> {
			log.info("consumerAwareErrorHandler receive : " + message.getPayload().toString());
			// offset提交
			consumer.commitAsync();
			return null;
		};
	}
}
